package com.hzzftech.watchdog.busi.core.executor;

import com.hzzftech.watchdog.busi.config.KettleConfig;
import com.hzzftech.watchdog.busi.constants.BusiConstant;
import com.hzzftech.watchdog.busi.core.dance.JobChangeState;
import com.hzzftech.watchdog.busi.core.executor.utils.TransExecuteUtil;
import com.hzzftech.watchdog.busi.core.util.KettleImgUtil;
import com.hzzftech.watchdog.busi.domain.*;
import com.hzzftech.watchdog.busi.service.*;
import com.hzzftech.watchdog.busi.service.impl.KtDispatcherStepsServiceImpl;
import com.hzzftech.watchdog.busi.service.impl.KtExecutionLogServiceImpl;
import com.hzzftech.watchdog.busi.utils.TaskManageUtil;
import com.hzzftech.watchdog.common.core.domain.entity.SysUser;
import com.hzzftech.watchdog.common.utils.StringUtils;
import com.hzzftech.watchdog.common.utils.spring.SpringUtils;
import com.hzzftech.watchdog.common.utils.uuid.IdUtils;
import com.hzzftech.watchdog.system.service.ISysUserService;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepStatus;
import org.pentaho.di.www.SlaveServerTransStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;

public class TransExecutor implements Callable<String> {

    public static Logger logger = LoggerFactory.getLogger(TransExecutor.class);

    private String logId;
    private TransMeta meta;
    private TransExecutionConfiguration configuration;

    private String key;

    private String carteId;

    private boolean clickTtop;

    private String dpRepoType;

    private KtExecutionLog log;

    private Long jobId;

    /**
     * @param configuration 执行配置
     * @param meta TRANS元数据
     * @param dpRepoType 仓库类型 0 git资源库 1 文件资源库 2 数据库资源库
     */
    public TransExecutor(TransExecutionConfiguration configuration, TransMeta meta, String dpRepoType,Long jobId) {
        this.configuration = configuration;
        this.meta = meta;
        this.dpRepoType = dpRepoType;
        this.jobId = jobId;
    }

    public TransExecutor(TransExecutionConfiguration configuration, TransMeta meta, String logId,String dpRepoType) {
        this.configuration = configuration;
        this.meta = meta;
        this.logId = logId;
        this.dpRepoType = dpRepoType;
    }

    @Override
    public String call() throws Exception {
        IKtExecutionLogService logService = SpringUtils.getBean(KtExecutionLogServiceImpl.class);
        IKtJobService jobService = SpringUtils.getBean(IKtJobService.class);
        KettleConfig kettleConfig = SpringUtils.getBean(KettleConfig.class);
        boolean insertFlag = false;
        if (StringUtils.isEmpty(logId)) {
            logId = IdUtils.fastUUID();
            logger.info("trans id: "+logId);
            log = new KtExecutionLog();
            log.setDpRepoType(dpRepoType);
            log.setId(logId);
            insertFlag = true;
        } else {
            log = logService.selectKtExecutionLogById(logId);
        }

        key = BusiConstant.KEY_PREFFIX+logId;
        try {
            TaskManageUtil.getInstance().add(key, this);
            int status = DispatcherStatus.STATUS_RUNNING.getStatus();
            // 执行日志
            log.setStartTime(new Date());
            log.setStatus(String.valueOf(status));
            log.setTaskName(meta.getName());
            log.setTaskType(BusiConstant.KETTLE_TYPE_FLAG_TRANS);
            log.setTaskId(Long.valueOf(meta.getObjectId().getId()));

            String transImg = KettleImgUtil.getTransImg(meta, kettleConfig.getGIT_FILE_REPOSITORY_LOG_FILE()+ File.separator+"trans"+File.separator);
            log.setImagePath(transImg);

            log.setExecutionMethod("远程："+configuration.getRemoteServer().getHostname());
            log.setSlaveId(Long.parseLong(configuration.getRemoteServer().getObjectId().getId()));

            if (jobId != null) {
                KtJob ktJob = jobService.selectJobById(jobId);
                if (ktJob != null) {
                    log.setJobId(jobId);
                    log.setJobName(ktJob.getJobName());
                }
            }

            // 发送远程执行
            carteId = TransExecuteUtil.sendToSlaveServer(meta, configuration,null);

            logger.info("trans carte obj id: "+carteId);
            log.setCarteId(carteId);
            logger.info("[transExecutor] 发送任务至carte服务,任务名称[{}]", meta.getName());
        } catch (Exception e) {
            logger.error("执行任务失败",e);
            log.setStatus(String.valueOf(DispatcherStatus.STATUS_FAIL.getStatus()));
            if (e instanceof KettleException) {
                log.setRemark("发送远程服务失败，调度ID："+meta.getObjectId().getId()+"，错误信息："+((KettleException)e).getSuperMessage());
            } else{
                log.setRemark("发送远程服务失败, 调度ID："+meta.getObjectId().getId());
            }
            log.setEndTime(new Date());
            IKtDispatcherService dispatcherService = SpringUtils.getBean(IKtDispatcherService.class);
            KtDispatcher dispatcher = dispatcherService.selectKtDispatcherById(Long.valueOf(meta.getObjectId().getId()));

            if (StringUtils.isNotEmpty(dispatcher.getWarnMsgIds() )) {
                IKtDispatcherFailedMsgService failedMsgService = SpringUtils.getBean(IKtDispatcherFailedMsgService.class);
                ISysUserService userService = SpringUtils.getBean(ISysUserService.class);
                for (String id : dispatcher.getWarnMsgIds().split(",")) {
                    KtDispatcherFailedMsg msg = new KtDispatcherFailedMsg();
                    msg.setDpId(Long.valueOf(meta.getObjectId().getId()));
                    msg.setFailedTime(new Date());
                    msg.setSendTime(0L);
                    msg.setStatus(BusiConstant.STATUS_YES);
                    msg.setDpName(dispatcher.getDpName());
                    msg.setUserId(Long.parseLong(id));
                    SysUser user = userService.selectUserById(Long.parseLong(id));
                    msg.setUserName(user.getUserName());
                    msg.setEmail(user.getEmail());
                    msg.setPhone(user.getPhonenumber());
                    failedMsgService.insertKtDispatcherFailedMsg(msg);
                }
            }
        }   finally {
            if (insertFlag) {
                logService.insertKtExecutionLog(log);
            } else {
                logService.updateKtExecutionLog(log);
            }
            JobChangeState.instance.jobStateChange();
            try {
                getRnStatus(logId);
            } catch (Exception e) {
                logger.error("",e);
            }
        }
        return "Trans End";
    }

    /**
     * 获取执行步骤
     * @param logId
     * @throws Exception
     */
    private void getRnStatus(String logId) throws Exception {
        SlaveServer remoteSServer = configuration.getRemoteServer();
        SlaveServerTransStatus status = remoteSServer.getTransStatus(meta.getName(), carteId, 0);

        List<StepStatus> stepStatuses = status.getStepStatusList();
        if (stepStatuses != null && stepStatuses.size() > 0) {
            IKtDispatcherStepsService stepsService = SpringUtils.getBean(KtDispatcherStepsServiceImpl.class);
            for (StepStatus stepStatus: stepStatuses) {
                KtDispatcherSteps kDisStepStatus = getKDisStepStatus(stepStatus);
                kDisStepStatus.setTaskId(logId);
                kDisStepStatus.setId(IdUtils.simpleUUID().toUpperCase());
                stepsService.insertKtDispatcherSteps(kDisStepStatus);
            }
        }
    }

    /**
     * 获取执行步骤
     */
    private KtDispatcherSteps getKDisStepStatus(StepStatus stepStatus) {
        KtDispatcherSteps steps = new KtDispatcherSteps();
        steps.setCopy(Long.parseLong(stepStatus.getCopy()+""));
        steps.setLinesRead(stepStatus.getLinesRead());
        steps.setStepName(stepStatus.getStepname());
        steps.setLinesWritten(stepStatus.getLinesWritten());
        steps.setLinesInput(stepStatus.getLinesInput());
        steps.setLinesOutput(stepStatus.getLinesOutput());
        steps.setLinesUpdated(stepStatus.getLinesUpdated());
        steps.setLinesRejected(stepStatus.getLinesRejected());
        steps.setLogErrors(stepStatus.getErrors());
        steps.setStatusDescription(stepStatus.getStatusDescription());
        steps.setLogSeconds(new Double(stepStatus.getSeconds()).longValue());
        steps.setSpeed(stepStatus.getSpeed());
        steps.setLogPriortty(stepStatus.getPriority());
        steps.setStopped(stepStatus.isStopped() ? "N": "Y");
        steps.setPaused(stepStatus.isPaused() ? "N": "Y");

        return steps;
    }

    // 停止trans 比较危险
    public void stop() {
//        if (trans != null) {
//            trans.stopAll();
//        }
    }

    // 暂停任务
    public void pause() {
//        if (!trans.isStopped()) {
//            trans.pauseRunning();
//        } else {
//            trans.resumeRunning();
//        }
    }

    public void setClickStop(boolean clickTtop) {
        this.clickTtop = clickTtop;
    }

    // 获取执行日志
    public String getExecutionLog() throws Exception {
        SlaveServer slaveServer = configuration.getRemoteServer();
        SlaveServerTransStatus transStatus = slaveServer.getTransStatus(meta.getName(), carteId, 0);
        return transStatus.getLoggingString();
    }
}
